package bigdata.jobclean.test

import bigdata.jobclean.WordCount2
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types.{DataTypes, StructField, StructType}

import scala.collection.mutable
class OtherClean {

}
/*
  测试 清理 分类是 `其他`类型的分类 到一个准确的分类

 */


object  OtherClean
{
//  获取分类
  def predict(detail:String):String={
    //    原始描述分词
    val input_words = WordCount2.word_split(detail)
    var p_cate2 = "未知"
    var p_value = 0
//    找到最相似的
    for((cate2,words) <- cate_words)
      {
//          var temp = jobGuess.getSimilar(words,input_words)
//        大于最大的
//          if(temp>p_value)
//            {
//             p_cate2 = cate2
//              p_value = temp
//            }

      }

    s"$p_cate2 ($p_value)"
  }
//  加载工作的词频数据
  def loadWc():Unit={


  }
//分类和对应的词频
  val cate_words: collection.mutable.Map[String, Array[String]] = collection.mutable.Map.empty

  def main(args: Array[String]): Unit = {
    val spark = SparkSession.builder()
      .master("local[2]").appName("jobClean").getOrCreate()
    spark.sparkContext.setLogLevel("WARN")
    val udf_predict = udf(predict _)

    //    词频数据
    var wc = spark.read.option("header", "true").csv("result/cate_wc/*")
    wc = wc.groupBy("cate1", "cate2").agg(collect_list("word").alias("words"))
    val cates = wc.collect()
    cates.foreach(row => {
      val cate2 = row.getAs[String]("cate2")
      val words:mutable.WrappedArray[String] = row.getAs("words")
        val temp2 = words.toArray[String]
      cate_words.update(cate2, temp2)

    }
    )

    val schema = StructType(Array("url","name","salary","province","city","exp","edu","num","pubtime","cname","ctype","ctrade","cnum","cate1","cate2","welfare","detail")
      .map(col=>StructField(col,DataTypes.StringType)))
    //    读取原始数据
    val data = spark.read.schema(schema).csv("result/cleaned/*")
    //    过滤出类型是其他的
    var others = data.filter(row => {
      row.getAs[String]("cate2") == "其他"
    })

    //    根据详情进行分类
    others = others.withColumn("predict", udf_predict(col("detail")))
//    others.select("name", "predict", "detail").show()
    println("开始写出")
    others.select("name", "predict", "detail").write.option("header","true").csv("result/predict")
    println("写出完成..")

  }



  }